package skymeta

import (
	"fmt"
	"sync"
	"time"

	"gitee.com/simonxie979/skymeta/protocol"

	"github.com/gogo/protobuf/proto"
)

var (
	handler   map[string]func(msg *protocol.SSMessage)
	frameChan chan *protocol.SSMessage // 框架消息队列
	srvChan   chan *protocol.SSMessage // 服务消息队列
	respChan  chan *protocol.SSMessage // 响应消息队列
	callMap   map[uint64]*callTimeout  // 远程调用等待表
	callMutex sync.Mutex
)

type callTimeout struct {
	ch  chan *protocol.SSMessage
	end time.Time
}

func init_Handler() {
	handler = make(map[string]func(msg *protocol.SSMessage))
	frameChan = make(chan *protocol.SSMessage, 100)
	srvChan = make(chan *protocol.SSMessage, 10000)
	respChan = make(chan *protocol.SSMessage, 10000)
	callMap = make(map[uint64]*callTimeout)

	handler[proto.MessageName(&protocol.ServiceLaunch{})] = handler_ServiceLaunch
	handler[proto.MessageName(&protocol.ServiceShutdown{})] = handler_ServiceShutdown
	handler[proto.MessageName(&protocol.ServiceList{})] = handler_ServiceList

	go handler_FrameMessage()
	go handler_ServiceMessage()
	go handler_ResponseMessage()
	go timeout_CallMap()
}

func exit_Handler() {
}

// handler_FrameMessage Dispatch framework message.
func handler_FrameMessage() {
	for {
		select {
		case <-g_Context.Done():
			return
		case ssMsg := <-frameChan:
			onFrameworkMessage(ssMsg)
		}
	}
}

// handler_ServiceMessage Dispatch service message.
func handler_ServiceMessage() {
	for {
		select {
		case <-g_Context.Done():
			return
		case ssMsg := <-srvChan:
			srv := center.GetService(ssMsg.Destination)
			if srv == nil {
				logger.Errorf("FrameWorkMsg", "onMessage not found service acccording to message. msg: %v", ssMsg)
				return
			}

			select {
			case srv.inPipe <- ssMsg:
			default:
				logger.Warnf("FrameWorkMsg", "service %s inPipe overflow, will drop message: %v", srv.GetServiceName(), ssMsg)
			}
		}
	}
}

// handler_ResponseMessage Process response message.
func handler_ResponseMessage() {
	for {
		select {
		case <-g_Context.Done():
			return
		case ssMsg := <-respChan:
			callMutex.Lock()

			if _, ok := callMap[ssMsg.GetSequence()]; ok {
				callMap[ssMsg.GetSequence()].ch <- ssMsg
			}
			delete(callMap, ssMsg.GetSequence())

			callMutex.Unlock()
		}
	}
}

// handler_ServiceLaunch Process other cluster node launched a service.
func handler_ServiceLaunch(msg *protocol.SSMessage) {
	ntc := &protocol.ServiceLaunch{}
	if err := proto.Unmarshal(msg.GetBody(), ntc); err != nil {
		logger.Errorf("MessageCenter", "unmarshal ServiceLaunch failure. err: %v. msg: %v", err, msg)
		return
	}

	logger.Debugf("MessageCenter", "%X ServiceLaunch %+v", msg.GetSource(), ntc)

	container := newRemoteContainer(msg.GetSource(), ntc.GetInfo().GetID(), ntc.GetInfo().GetName())

	center.RemoteServiceLaunch(container.GetSessionID(), container)
}

// handler_ServiceShutdown Process other cluster node shutdown a service.
func handler_ServiceShutdown(msg *protocol.SSMessage) {
	ntc := &protocol.ServiceShutdown{}
	if err := proto.Unmarshal(msg.GetBody(), ntc); err != nil {
		logger.Errorf("MessageCenter", "unmarshal ServiceShutdown failure. err: %v. msg: %v", err, msg)
		return
	}

	logger.Debugf("MessageCenter", "%X ServiceShutdown %+v", msg.GetSource(), ntc)

	center.RemoteServiceShutdown(msg.GetSource(), ntc.GetInfo().GetID())
}

// handler_ServiceList Process other cluster node notify service list.
func handler_ServiceList(msg *protocol.SSMessage) {
	ntc := &protocol.ServiceList{}
	if err := proto.Unmarshal(msg.GetBody(), ntc); err != nil {
		logger.Errorf("MessageCenter", "unmarshal ServiceList failure. err: %v. msg: %v", err, msg)
		return
	}

	logger.Debugf("MessageCenter", "%X ServiceList %+v", msg.GetSource(), ntc)

	var list []*container
	for _, v := range ntc.GetServiceList() {
		container := newRemoteContainer(msg.GetSource(), v.GetID(), v.GetName())
		list = append(list, container)
	}

	logger.Debugf("MessageCenter", "container list: %v", list)

	center.RemoteServiceList(msg.GetSource(), list)
}

// onFrameworkMessage process framework message.
func onFrameworkMessage(msg *protocol.SSMessage) {
	defer func() {
		if err := recover(); err != nil {
			logger.Errorf("FrameWorkMsg", "process framework message failure. err: %v, msg: %v", err, msg)
			return
		}
	}()

	f, ok := handler[msg.GetName()]
	if !ok {
		logger.Warnf("FrameWorkMsg", "not found handler according to msg: %v", msg)
		return
	}

	f(msg)
}

// onConnect Event of cluster node connect.
func onConnect(sessionID uint64, addr string) {
	ntc := &protocol.ServiceList{}

	for _, v := range center.GetLocalServices() {
		ntc.ServiceList = append(ntc.ServiceList, &protocol.ServiceInfo{
			ID:   v.GetServiceID(),
			Name: v.GetServiceName(),
		})
	}

	logger.Tracef("FrameWorkMsg", "onConnect list: %+v", ntc)

	data, err := proto.Marshal(ntc)
	if err != nil {
		logger.Errorf("FrameWorkMsg", "onConnect marshal ServieList failure. err: %v", err)
	}

	msg := &protocol.SSMessage{
		Source:      0,
		Destination: 0,
		Sequence:    0,
		Name:        proto.MessageName(ntc),
		Body:        data,
	}

	node.SendMsg(sessionID, msg)
}

// onDisConnect Event of cluster node disconnect.
func onDisConnect(sessionID uint64, err error) {
	center.RemoteOffline(sessionID)
}

// onMessage Event of recive message from cluster node, or self.
func onMessage(sessionID uint64, msg *protocol.SSMessage) {
	switch msg.GetType() {
	case protocol.MessageType_Frame:
		msg.Source = sessionID
		frameChan <- msg
	case protocol.MessageType_Send, protocol.MessageType_Call:
		srvChan <- msg
	case protocol.MessageType_Resp:
		respChan <- msg
	default:
		logger.Errorf("MessageCenter", "invalid message from %X. msg: %v", sessionID, msg)
	}
}

// addCall Add a channel in to callMap, for waiting target service response.
func addCall(sequence uint64, ch chan *protocol.SSMessage) error {
	callMutex.Lock()
	defer callMutex.Unlock()

	if _, ok := callMap[sequence]; ok {
		logger.Warnf("MessageCenter", "addCall sequence %v repeated", sequence)
		return fmt.Errorf("call sequence %v repeated", sequence)
	}

	callMap[sequence] = &callTimeout{
		ch:  ch,
		end: time.Now().Add(time.Second * 2),
	}

	return nil
}

func timeout_CallMap() {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	callTimeout := &protocol.CallTimeout{}
	data, err := proto.Marshal(callTimeout)
	if err != nil {
		logger.Panicf("CallTimeout", "marshal call timeout message failure. err: %v", err)
	}

	ctMsg := &protocol.SSMessage{
		Name: proto.MessageName(callTimeout),
		Body: data,
	}

	for {
		select {
		case <-g_Context.Done():
			return
		case now := <-ticker.C:
			callMutex.Lock()

			for sequence, ct := range callMap {
				if ct.end.Before(now) {
					delete(callMap, sequence)

					ct.ch <- ctMsg
				}
			}

			callMutex.Unlock()
		}
	}
}
